package mesosphere.marathon
package core.experimental.repository

import java.util.concurrent.TimeUnit

import akka.Done
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import com.typesafe.scalalogging.StrictLogging
import mesosphere.marathon.core.storage.zookeeper.{AsyncCuratorBuilderFactory, AsyncCuratorBuilderSettings, ZooKeeperPersistenceStore}
import mesosphere.marathon.experimental.repository.TemplateRepository
import mesosphere.marathon.experimental.repository.TemplateRepositoryLike.Template
import mesosphere.marathon.metrics.Metrics
import mesosphere.marathon.metrics.dummy.DummyMetrics
import mesosphere.marathon.state.{AbsolutePathId, AppDefinition}
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.BoundedExponentialBackoffRetry
import org.apache.curator.x.async.api.CreateOption
import org.openjdk.jmh.annotations.{
  Benchmark,
  BenchmarkMode,
  Fork,
  Level,
  Measurement,
  Mode,
  OutputTimeUnit,
  Param,
  Scope,
  State,
  TearDown,
  Warmup
}
import org.openjdk.jmh.infra.Blackhole

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.util.Random

/**
  * To run e.g. [[TemplateRepositoryBenchmark#create()]] benchmark execute from the console:
  * $ sbt "benchmark/clean" "benchmark/jmh:run mesosphere.marathon.core.experimental.repository.TemplateRepositoryBenchmark.create"
  *
  * Below some interesting numbers (for reading and writing templates) generated by this benchmark on the 2018 MacBook Pro.
  *
  * --- Benchmarking results for [[TemplateRepository]]
  *
  * [info] Benchmark                           (size)   Mode  Cnt    Score     Error  Units
  * [info] TemplateRepositoryBenchmark.create     100  thrpt   15  908.827 ±  16.884  ops/s
  * [info] TemplateRepositoryBenchmark.create    1024  thrpt   15  720.073 ±  86.045  ops/s
  * [info] TemplateRepositoryBenchmark.create   10240  thrpt   15  224.020 ± 131.083  ops/s
  * [info] TemplateRepositoryBenchmark.create  102400  thrpt   15   63.256 ±  10.292  ops/s
  *
  * [info] Benchmark                         (size)   Mode  Cnt      Score       Error  Units
  * [info] TemplateRepositoryBenchmark.read     100  thrpt   15  65696.268 ±  1438.135  ops/s
  * [info] TemplateRepositoryBenchmark.read    1024  thrpt   15  51548.543 ± 16983.939  ops/s
  * [info] TemplateRepositoryBenchmark.read   10240  thrpt   15  26720.130 ±  3962.896  ops/s
  * [info] TemplateRepositoryBenchmark.read  102400  thrpt   15   5368.623 ±   228.691  ops/s
  *
  * --- Benchmarking results for the underlying [[ZooKeeperPersistenceStore]]
  *
  * [info] Benchmark                                   (num)  (size)   Mode  Cnt     Score      Error  Units
  * [info] ZooKeeperPersistenceStoreBenchmark.creates      1     100  thrpt    5  3646.962 ±  197.825  ops/s
  * [info] ZooKeeperPersistenceStoreBenchmark.creates      1    1024  thrpt    5  2712.084 ±  110.160  ops/s
  * [info] ZooKeeperPersistenceStoreBenchmark.creates      1   10240  thrpt    5   626.717 ±   47.731  ops/s
  * [info] ZooKeeperPersistenceStoreBenchmark.creates      1  102400  thrpt    5    82.617 ±   31.220  ops/s
  *
  * [info] Benchmark                                (num)  (size)   Mode  Cnt     Score     Error  Units
  * [info] ZooKeeperPersistenceStoreBenchmark.read      1      10  thrpt   15  7727.117 ± 582.189  ops/s
  * [info] ZooKeeperPersistenceStoreBenchmark.read      1     100  thrpt   15  8015.907 ± 186.650  ops/s
  * [info] ZooKeeperPersistenceStoreBenchmark.read      1    1024  thrpt   15  7769.695 ± 514.147  ops/s
  * [info] ZooKeeperPersistenceStoreBenchmark.read      1   10240  thrpt   15  7817.817 ±  43.574  ops/s
  * [info] ZooKeeperPersistenceStoreBenchmark.read      1  102400  thrpt   15  4433.334 ± 169.564  ops/s
  *
  */
@State(Scope.Benchmark)
object TemplateRepositoryBenchmark extends StrictLogging {

  implicit val system: ActorSystem = ActorSystem()
  implicit val mat: ActorMaterializer = ActorMaterializer()
  implicit val ec: ExecutionContext = system.dispatcher

  object Conf extends ZookeeperConf
  Conf.verify()

  val curator: CuratorFramework = CuratorFrameworkFactory.newClient(
    Conf.zooKeeperUrl().hostsString,
    Conf.zooKeeperSessionTimeout().toInt,
    Conf.zooKeeperConnectionTimeout().toInt,
    new BoundedExponentialBackoffRetry(
      Conf.zooKeeperOperationBaseRetrySleepMs(),
      Conf.zooKeeperTimeout().toInt,
      Conf.zooKeeperOperationMaxRetries()
    )
  )
  curator.start()

  lazy val settings: AsyncCuratorBuilderSettings =
    new AsyncCuratorBuilderSettings(createOptions = Set(CreateOption.createParentsIfNeeded), compressedData = false)
  lazy val factory: AsyncCuratorBuilderFactory = AsyncCuratorBuilderFactory(curator, settings)
  lazy val metrics: Metrics = DummyMetrics
  lazy val store: ZooKeeperPersistenceStore = new ZooKeeperPersistenceStore(metrics, factory, parallelism = 16)

  val APP_VERSION: String = "1"

  class FixedVersionTemplateRepository(store: ZooKeeperPersistenceStore, base: String = "/benchmark")
      extends TemplateRepository(store, base) {

    /**
      * An extra black hole to prevent jit optimization of the unused template version (see below)
      */
    val hole = new Blackhole("Today's password is swordfish. I understand instantiating Blackholes directly is dangerous.")

    /**
      * To make benchmarking easier we override [[mesosphere.marathon.experimental.repository.TemplateRepositoryLike.version()]]
      * method that returns a constant string for a given template to making reading them back easier. However we still
      * call the super method to keep the benchmarking results valid.
      */
    override def version(template: Template[_]): String = {
      hole.consume(super.version(template)) // Let's hope JVM will not optimize this call away
      APP_VERSION
    }
  }

  val repository = new FixedVersionTemplateRepository(store)
  Await.result(new FixedVersionTemplateRepository(store).initialize(), Duration.Inf)

  val random = new Random()

  def randomPathId(prefix: String) = AbsolutePathId(s"/$prefix/sleep-${random.alphanumeric.take(8).mkString}")
  def randomRangedPathId(prefix: String, maxRange: Int) = AbsolutePathId(s"/$prefix/sleep-${random.nextInt(maxRange) + 1}")

  /**
    * Return an [[AppDefinition]] with given pathId. We simulate big app definitions by creating a label with a given value length.
    * The total size of the app definition will be approx. label size + 159 bytes where 159 bytes is the size of the serialized
    * app definition alone.
    * Note: the resulting ZK node size is ~25% smaller then calculated since protubuf is doing a decent job compressing the label value.
    *
    * @param pathId
    * @param labelSize
    * @return
    */
  def appDef(pathId: AbsolutePathId, labelSize: Int = 1): AppDefinition =
    AppDefinition(
      id = pathId,
      role = "someRole",
      labels = Map("a" -> random.alphanumeric.take(labelSize).mkString)
    )

  /**
    * A map of node size to number of nodes of that size. Used for read, update and delete benchmarks. Note that
    * different number of nodes is used depending on the node data size e.g. creating 10K nodes with 1Mb data is not
    * practical since it will result in 10Gb of data.
    */
  type NodeSize = Int
  type NodeNumber = Int
  val params = Map[NodeSize, NodeNumber]((100, 10000), (1024, 10000), (10240, 10000), (102400, 10000))

  /**
    * Helper method to pre-populate Zookeeper with app definitions. Nodes are created using [[TemplateRepository.storePath]]
    * method which converts the app's path Id to the zookeeper path. App names are of the form `$NodeSize/sleep-N` so e.g.
    * an app with 1024 bytes payload size e.g. `/1024/sleep-1` is stored in `/templates/1024/sleep-1`.
    *
    * @param args
    */
  def main(args: Array[String]): Unit = {
    def populate(size: Int, num: Int): Future[Done] = {
      Source(1 to num)
        .map(i => AbsolutePathId(s"/$size/sleep-$i"))
        .map(pathId => appDef(pathId, labelSize = size))
        .map(app => repository.toNode(app))
        .via(repository.store.createFlow)
        .runWith(Sink.ignore)
    }

    Await.result(
      Source
        .fromIterator(() => params.iterator)
        .map { p => logger.info(s"Storing ${p._2} app definitions of ~${p._1} bytes size"); p }
        .mapAsync(1) { case (size, num) => populate(size, num) }
        .runWith(Sink.ignore),
      Duration.Inf
    )

    logger.info("Zookeeper successfully populated with data")
    system.terminate()
  }
}

@Fork(1)
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
@Warmup(iterations = 15, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 15, time = 1, timeUnit = TimeUnit.SECONDS)
class TemplateRepositoryBenchmark extends StrictLogging {

  import TemplateRepositoryBenchmark._

  /** Node data size */
  @Param(value = Array("100", "1024", "10240", "102400"))
  var size: Int = _

  @Benchmark
  def create(hole: Blackhole) = {
    val res = Await.result(repository.create(appDef(randomPathId(size.toString), labelSize = size)), Duration.Inf)
    hole.consume(res)
  }

  @Benchmark
  def read(hole: Blackhole) = {
    val res = repository.read(AppDefinition(id = randomRangedPathId(size.toString, params(size)), role = "someRole"), APP_VERSION)
    hole.consume(res)
  }

  @Benchmark
  def delete(hole: Blackhole) = {
    val res = Await.result(repository.delete(randomRangedPathId(size.toString, params(size))), Duration.Inf)
    hole.consume(res)
  }

  @TearDown(Level.Trial)
  def close(): Unit = {
    curator.close()
    Await.result(system.terminate(), Duration.Inf)
  }
}
